ShowTable of Contents
Overview
The Lotus Expeditor micro broker component is a small message broker that provides a messaging fabric for integrating various parts of a solution such as applications and small devices located at the edge of the network. A message broker ensures that messages arrive at the correct destination and are transformed to the format required by each destination. The following article builds upon examples within the wiki and highlights some of the common components of the micro broker feature.
Micro broker
At its most basic level, micro broker provides an ability to define local queues and topics subject to message reception via external clients. Messages arriving on micro broker are subject to the requirements of the developer. For example, messages may be retrieved, inspected, and acted upon by custom code or the micro broker topology may be integrated into larger solutions using products such as WebSphere MQ.
A working example of micro broker exists in the samples delivered with the Expeditor Desktop Client:
- com.ibm.rcp.samples.microbroker.setup
- com.ibm.rcp.samples.microbroker.jmspubsub
Persistence
Persistence allows messages to be stored on varying levels of persistence. For example, a message intended from micro broker to a remote destination may be store in memory and subsequently resent until message delivery is successful. A more robust method of persistence is storage to disk allowing messages to be sent to the destination in the event of power failures or network interruptions.
The com.ibm.rcp.samples.microbroker.setup contains code that allows a developer to add the persistence level to the broker.
BrokerDefinition brokerDef = brokerFactoryService.createBrokerDefinition( BROKER_NAME );
brokerDef.setDataDirectory( computePath() );
PersistenceDefinition persistenceDef = brokerDef.getPersistenceDefinition();
persistenceDef.setPersistenceType( PersistenceDefinition.FULL_PERSISTENCE );
brokerDef.setPersistenceDefinition( persistenceDef );
broker = brokerFactoryService.create( brokerDef );
Trace
Unlike most Expeditor plugins, micro broker writes trace to a separate data file rather than the OSGI console. The following code enables trace for a locally running micro broker and sets the trace file to a location using the plugin's Activator class.
public void start(BundleContext context) throws Exception {
super.start(context);
plugin = this;
LocalBroker broker = BrokerFactory.INSTANCE.getByName(BROKER_NAME);
if (broker != null && !broker.isRunning()) {
broker.start();
}
String traceFile = enableTrace(broker, Platform.getStateLocation(
context.getBundle()).toOSString(), Trace.TRACE_LEVEL_MAX);
if (traceFile != null) {
context.registerService(CommandProvider.class.getName(),
new BrokerTrace(traceFile), null);
} else {
System.out.println("Unable to enable trace");
}
}
MQTTClient
Assuming that a micro broker is running, how does one send data to it? This function would be provided by the aforementioned "clients". If Expeditor Integrator is deployed, a micro broker is configured automatically and run when the Integrator server is started. An MqttClient provides an easy means of sending data to point-to-point locations such as a queue or topic defined by the Integrator server's micro broker.
The following code demonstrates all that is needed to send messages to micro broker.
public static void send(String server, String destination, boolean isQueue,
String payload, MqttProperties headers) throws MqttException {
// connect to microbroker
MqttClient client = new MqttClient(server, MqttSender.class.getName());
client.connect();
MqttDestination dest = null;
// create queue
if (isQueue) {
dest = client.getQueue(destination);
} else {
dest = client.getTopic(destination);
}
// create message
MqttMessage message = new MqttMessage(payload);
// QoS is the fastest, but should only be used for messages which are
// not valuable
message.setQos(0);
// additional headers
if (headers != null) {
message.setProperties(headers);
}
// send the message
System.out.println("Sending ...");
MqttDeliveryToken token = dest.send(message);
System.out.println("Message received " + token.isComplete());
// disconnect from microbroker
client.disconnect();
}
A developer can then send messages to Integrator by simply doing the following.
String server = "tcp://mq.atlanta.ibm.com:1883";
// send a message using MQTT
MqttSender.send(server, "XPDinteg_ResOutQ", true, "MQTT Sample Message",
null);
// restart an Integrator server via a control message
MqttProperties headers = new MqttProperties();
headers.put("Command", "restart");
headers.put("Param", "console");
headers.put("MessagePurpose", "PlatformRestart");
headers.put("StoreId", "Store500"); // must match the ID in XPDinteg.xml
MqttSender.send(server, "XPDinteg_CtrlQ", true, "", headers);
Bridge
The micro broker bridge provides a mechanism for connecting various "back-ends". For example, a micro broker may be bridge to another micro broker or to a WebSphere MQ server. In these cases, the messages sent to one micro broker may be forwarded via the bridge to the back-end and vice versa. This is very similar in function to Expeditor Integrator. The bridge provides a mechanism to connect the edge of network, transform or process messages coming from the edge, and forward them to the back-end.
The creation of a bridge can be broken down into the following parts:
- Obtain the micro broker
- Create the pipe that acts as the medium to the back-end
- Configure flows on the pipe - i.e. how messages move from micro broker to back-end and vice versa
- Add the pipe to the bridge
- Start the bridge
The process can be illustrated using the following code.
BrokerFactory factory = BrokerFactory.INSTANCE;
try {
bridge = factory.getByName(BROKER_NAME).getBridge();
// create a local microbroker queue for local to remote bridging
createLocalQueue(factory.getByName(BROKER_NAME));
// create the pipe to WebSphere MQ
PipeDefinition pd = createMqConnection();
// configure bridge flows
configure(pd);
// Add the pipe to the bridge
bridge.addPipe(pd);
// open the pipe
bridge.startAllPipes();
} catch (Exception e) {
e.printStackTrace();
}
And each helping method demonstrated here.
private void createLocalQueue(LocalBroker broker) {
try {
assert(broker.isRunning() == true);
String local = "LocaluBQueue";
MessagingEngine me = broker.getMessagingEngine();
com.ibm.micro.admin.QueueDefinition qd = me
.createQueueDefinition(local);
qd.setName(local);
qd.setMaximumDepth(1000);
me.createStaticQueue(qd);
} catch (Exception e) {
e.printStackTrace();
}
}
private PipeDefinition createMqConnection() {
PipeDefinition pd = null;
try {
pd = bridge.createPipeDefinition("Support-Pipe");
MQJMSConnectionDefinition connector = bridge
.createMQJMSConnectionDefinition("Support-Pipe-Pipe");
connector.setHost("mq.atlanta.ibm.com");
connector.setPort(1414);
connector.setChannel("CHAN2"); // server channel
connector.setQueueManager("QM_xpd_intg");
connector.setUserName("Administrator");
connector.setPassword("password");
connector.setSyncQName("XPDINTEG.SyncQ500");
// Set the connection on the pipe definition
pd.setConnection(connector);
} catch (Exception e) {
e.printStackTrace();
}
return pd;
}
private void configure(PipeDefinition pd) {
assert (pd != null);
try {
// MQ to local topic
FlowDefinition fd1 = bridge.createFlowDefinition("flow1");
fd1.setSources(new QueueDefinition[] { bridge
.createQueueDefinition("XPDINTEG.ReqOutQ500") });
fd1.setQos(FlowDefinition.DEFAULT_QOS);
fd1.setTarget(bridge.createTopicDefinition("SamplesTopic"));
pd.addInboundFlow(fd1);
// Local Microbroker queue to remote MQ queue
FlowDefinition fd2 = bridge.createFlowDefinition("flow2");
fd2.setSources(new QueueDefinition[] { bridge
.createQueueDefinition("LocaluBQueue") });
fd2.setQos(FlowDefinition.DEFAULT_QOS);
fd2.setTarget(bridge.createQueueDefinition("XPDINTEG.ResInQ500"));
pd.addOutboundFlow(fd2);
// add tranformation
// NOTE this must occur before adding the pipe to the bridge
// SampleTranformation.getInstance().addTransformation(bridge, pd,
// fd1, true, true);
} catch (Exception e) {
e.printStackTrace();
}
}
The flows allow developers to place messages (using the MqttClient shown above) on either a topic or the locally created queue. Should a message be placed on the LocaluBQueue, the message is automatically transferred to the XPDINTEG.ResInQ500 defined on WebSphere MQ. Similar events occur with the SamplesTopic - bridging a WebSphere MQ queue an the local topic. The topic can be created using the sample com.ibm.rcp.samples.microbroker.jmspubsub.
The above code comments on creation of a transformation added to the bridge. This can be done to convert messages on arrival. For example, a developer may need to change a message header value depending on custom logic contained in the transformer.
Conclusion
This article only provides a high level overview of micro broker and the concepts of messaging. There are additional ways to send data to micro broker using MessageProducer as well as more fine grained concepts such as usage of JNDI to send messages. Expeditor samples and the wiki provide details on both, and exploring either along with the above code should be sufficient for most developers to begin creating messaging plugins.